Skip to content

Implement MCP Streamable HTTP Transport Protocol #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

tendant
Copy link

@tendant tendant commented Apr 30, 2025

Key Features

Implementation of the MCP Streamable HTTP transport specification

Implementation Details

Added server/streamable_http.go with the server-side implementation
Added comprehensive tests in server/streamable_http_test.go
Created example implementations:
    Full-featured server and client examples with notifications
    Minimal server and client examples for simpler use cases
Added detailed documentation in README-streamable-http.md

Summary by CodeRabbit

  • New Features

    • Introduced a Streamable HTTP server implementation supporting session management, event streaming (SSE), direct JSON responses, and resumability for the MCP protocol.
    • Added flexible configuration options for session IDs, event stores, and context customization.
    • Provided in-memory event storage and replay functionality.
  • Examples

    • Added runnable example programs for minimal and complete client-server interactions using Streamable HTTP transport, including notification handling and graceful shutdown.
  • Documentation

    • Added a comprehensive README detailing protocol usage, server/client setup, configuration, and example workflows.
  • Tests

    • Introduced a thorough test suite covering session initialization, SSE streaming, notification delivery, and session termination.

Copy link
Contributor

coderabbitai bot commented Apr 30, 2025

Walkthrough

This change introduces a complete implementation and documentation of the MCP Streamable HTTP transport in Go. It adds a new server-side transport (StreamableHTTPServer) with session management, event streaming via Server-Sent Events (SSE), resumability, and support for both JSON and SSE responses. An event store interface and in-memory implementation are provided for event replay. Multiple example programs demonstrate minimal and full-featured client-server usage, including session initialization, tool invocation, notification handling, and graceful shutdown. Comprehensive tests for the server are included. A detailed README documents protocol details, usage, and design considerations for both client and server implementations.

Changes

File(s) Change Summary
README-streamable-http.md Added a comprehensive README documenting the MCP Streamable HTTP transport implementation, protocol details, usage examples, and implementation notes.
server/streamable_http.go Introduced the StreamableHTTPServer implementation with session management, SSE streaming, event store interface and in-memory implementation, HTTP handlers, configuration options, and utility functions.
server/streamable_http_test.go Added a complete test suite for the Streamable HTTP server, covering initialization, SSE streaming, notification delivery, and session termination.
examples/minimal_client/main.go
examples/minimal_server/main.go
Added minimal client and server example programs demonstrating basic session initialization, tool invocation, and clean shutdown using the Streamable HTTP transport.
examples/streamable_http_client/main.go
examples/streamable_http_client_complete/main.go
Added example programs showing client usage of Streamable HTTP, including session initialization, notification handling, tool listing, tool calling, and graceful shutdown.
examples/streamable_http_server/main.go Added a standalone example of a Streamable HTTP server with an echo tool, notification sending, and graceful shutdown logic.

Possibly related PRs

  • mark3labs/mcp-go#168: Implements the client-side Streamable HTTP transport, which is directly referenced and demonstrated by the documentation and examples introduced in this PR.
  • mark3labs/mcp-go#179: Adds session and per-session tool management interfaces and implementations used by the Streamable HTTP server in this PR.
  • mark3labs/mcp-go#29: Implements an SSEServer HTTP handler, related to the StreamableHTTPServer’s HTTP handling of SSE streams in this PR.
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (17)
server/streamable_http.go (5)

424-429: Guard http.Flusher access

w.(http.Flusher) will panic if the underlying writer does not implement Flusher (e.g. certain test doubles). Always assert the capability first.

- w.(http.Flusher).Flush()
+ if flusher, ok := w.(http.Flusher); ok {
+     flusher.Flush()
+ }

508-555: Notification fan-out may silently drop messages

eventChan is buffered with capacity 10. When the buffer is full the fallback is to block; however the select defaults to returning on <-notifDone>.
If the client is slow, notifications will be lost without feedback.
Consider:

  • Using a larger buffer or an unbounded queue.
  • Back-pressure mechanisms (e.g. context cancellation).
  • Logging or propagating the overflow error.

748-759: event argument is unused in callers – clarify or remove

writeSSEEvent allows specifying an event: field but nothing in this file invokes it with a non-empty value. If custom event names are required, document and use them; otherwise drop the parameter to avoid confusion.


323-329: Errors from notification handling are discarded

server.HandleMessage may return an error for notifications (e.g. malformed data). Returning 202 Accepted unconditionally can hide failures.
At minimum, log the error and return 400/500 when appropriate.


829-845: Dead code – validateSession is never used

The helper looks correct but is not referenced anywhere. Remove it or wire it into handlePost/handleGet to ensure consistent validation.

examples/minimal_client/main.go (2)

24-26: Shared context may cancel long-running requests

Both the initialize and tools/call requests reuse the same 30 s context. If the first call consumes most of the timeout, the second may fail prematurely. Prefer deriving a fresh context per operation.

initCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
...
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)

21-22: Ignore Close error

trans.Close() returns an error (according to transport interface) but it is discarded. Consider logging it to aid debugging of lingering connections.

examples/minimal_server/main.go (1)

64-69: Handle http.ErrServerClosed gracefully

When Shutdown is invoked the listener returns http.ErrServerClosed, which is expected and should not be treated as fatal.

-if err := streamableServer.Start(":8080"); err != nil {
-    log.Fatalf("Failed to start server: %v", err)
-}
+if err := streamableServer.Start(":8080"); err != nil && err != http.ErrServerClosed {
+    log.Fatalf("Failed to start server: %v", err)
+}
examples/streamable_http_client/main.go (1)

1-56: Well-structured example with clear flow and good practices

This example demonstrates a streamable HTTP client with all essential components: transport creation, notification handling, session initialization, and response display. The code follows Go best practices with proper error handling and resource cleanup.

Consider adding more detailed comments explaining the expected behavior and outputs at key points, especially for the notification handler. For instance, explaining what kind of notifications might be received and how they're structured would help users understand the example better.

examples/streamable_http_client_complete/main.go (1)

1-131: Comprehensive client example with complete MCP workflow

This example builds on the basic client by adding tool listing, tool invocation, and notification handling with signal management. The structure is logical and demonstrates a complete interaction flow with an MCP server.

Consider adding comments explaining the expected notification content from the echo tool, particularly in the notification handler setup (lines 27-31) or near the waiting section (lines 116-119). This would help users understand what to expect when running the example.

examples/streamable_http_server/main.go (1)

61-66: Consider adding context check in notification goroutine

The notification goroutine doesn't check if the context is still valid before sending the notification, which could lead to errors if the server is shutting down.

go func() {
	time.Sleep(1 * time.Second)
+	// Check if context is still valid before sending notification
+	select {
+	case <-ctx.Done():
+		return
+	default:
		mcpServer.SendNotificationToClient(ctx, "echo/notification", map[string]interface{}{
			"message": "Echo notification: " + message,
		})
+	}
}()
README-streamable-http.md (3)

22-25: Fix bullet point formatting

The bullet points in this section have loose punctuation marks that should be fixed for better readability.

### Key Components

- `StreamableHTTPServer`: The main server implementation that handles HTTP requests and responses
- `streamableHTTPSession`: Represents an active session with a client
- `EventStore`: Interface for storing and retrieving events for resumability
- `InMemoryEventStore`: A simple in-memory implementation of the EventStore interface
🧰 Tools
🪛 LanguageTool

[uncategorized] ~22-~22: Loose punctuation mark.
Context: ...Key Components - StreamableHTTPServer: The main server implementation that han...

(UNLIKELY_OPENING_PUNCTUATION)


34-37: Consider expanding the client implementation section

The client implementation section is quite brief compared to the server section. Adding more details about the client's key components, options, and design considerations would make the documentation more balanced.

Consider expanding this section to include:

  • Key client components
  • Client options (similar to server options section)
  • Design considerations specific to the client implementation

273-302: Add a troubleshooting section

The documentation would benefit from a troubleshooting section that addresses common issues users might encounter, such as connection problems, session expiration, or event replay failures.

Consider adding a "Troubleshooting" section that covers:

  • Common error scenarios and their solutions
  • Debugging tips (e.g., enabling verbose logging)
  • Best practices for error handling in both client and server implementations
server/streamable_http_test.go (3)

73-92: Consider flattening nested conditionals for better readability

The nested conditionals for validating the response structure could be simplified for better readability and easier debugging when tests fail.

- if result, ok := response["result"].(map[string]interface{}); ok {
-   if serverInfo, ok := result["serverInfo"].(map[string]interface{}); ok {
-     if serverInfo["name"] != "test-server" {
-       t.Errorf("Expected server name test-server, got %v", serverInfo["name"])
-     }
-     if serverInfo["version"] != "1.0.0" {
-       t.Errorf("Expected server version 1.0.0, got %v", serverInfo["version"])
-     }
-   } else {
-     t.Errorf("Expected serverInfo in result, got none")
-   }
- } else {
-   t.Errorf("Expected result in response, got none")
- }
+ result, ok := response["result"].(map[string]interface{})
+ if !ok {
+   t.Fatalf("Expected result in response, got none")
+ }
+ 
+ serverInfo, ok := result["serverInfo"].(map[string]interface{})
+ if !ok {
+   t.Fatalf("Expected serverInfo in result, got none")
+ }
+ 
+ if serverInfo["name"] != "test-server" {
+   t.Errorf("Expected server name test-server, got %v", serverInfo["name"])
+ }
+ 
+ if serverInfo["version"] != "1.0.0" {
+   t.Errorf("Expected server version 1.0.0, got %v", serverInfo["version"])
+ }

257-257: Replace hard-coded sleep with a more reliable mechanism

Using a fixed sleep duration can lead to flaky tests on different environments or under different load conditions.

Consider using a more robust synchronization mechanism or at least making the sleep duration configurable:

- // Wait a bit for the stream to be established
- time.Sleep(100 * time.Millisecond)
+ // Wait for the stream to be established
+ streamEstablishmentTimeout := 200 * time.Millisecond
+ time.Sleep(streamEstablishmentTimeout)

For even better reliability, you could implement a ready signal from the server once the stream is established.


293-312: Implement more robust SSE event parsing

The current SSE event parsing is simplified and assumes a specific format. A more robust implementation would handle multiple data lines, event IDs, and other SSE features.

Consider implementing a more comprehensive SSE parser or using the same timeout pattern recommended for the previous event reading loop.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df73667 and 5f5303c.

📒 Files selected for processing (8)
  • README-streamable-http.md (1 hunks)
  • examples/minimal_client/main.go (1 hunks)
  • examples/minimal_server/main.go (1 hunks)
  • examples/streamable_http_client/main.go (1 hunks)
  • examples/streamable_http_client_complete/main.go (1 hunks)
  • examples/streamable_http_server/main.go (1 hunks)
  • server/streamable_http.go (1 hunks)
  • server/streamable_http_test.go (1 hunks)
🧰 Additional context used
🪛 LanguageTool
README-streamable-http.md

[uncategorized] ~22-~22: Loose punctuation mark.
Context: ...Key Components - StreamableHTTPServer: The main server implementation that han...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~29-~29: Loose punctuation mark.
Context: ...rver Options - WithSessionIDGenerator: Sets a custom session ID generator - `W...

(UNLIKELY_OPENING_PUNCTUATION)

🔇 Additional comments (6)
server/streamable_http.go (1)

448-457: 🛠️ Refactor suggestion

Deferred close order can panic – potential send-on-closed-channel

defer close(eventChan) is registered before defer close(notifDone).
If the goroutine is still attempting to send on eventChan when handleSSEResponse returns, closing eventChan first can trigger a panic.
Reverse the order or signal the goroutine before the channel is closed.

- defer close(eventChan)
- ...
- notifDone := make(chan struct{})
- defer close(notifDone)
+ notifDone := make(chan struct{})
+ defer close(notifDone)   // signal first
+ defer close(eventChan)   // close after goroutine stops

Likely an incorrect or invalid review comment.

examples/streamable_http_client_complete/main.go (1)

120-130: Good signal handling pattern for graceful termination

The signal handling implementation with a select statement provides a clean way to wait for either notifications or termination signals.

examples/streamable_http_server/main.go (1)

1-97: Well-structured server example with proper lifecycle management

The example demonstrates a complete MCP server with streamable HTTP transport, including tool registration, async notification handling, and graceful shutdown. The code follows good practices for server implementation in Go.

README-streamable-http.md (1)

1-302: Comprehensive and well-structured documentation

The README provides thorough documentation of the MCP Streamable HTTP implementation, covering both server and client sides, with clear examples and protocol details. It aligns well with the provided example code and offers valuable context for users.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~22-~22: Loose punctuation mark.
Context: ...Key Components - StreamableHTTPServer: The main server implementation that han...

(UNLIKELY_OPENING_PUNCTUATION)


[uncategorized] ~29-~29: Loose punctuation mark.
Context: ...rver Options - WithSessionIDGenerator: Sets a custom session ID generator - `W...

(UNLIKELY_OPENING_PUNCTUATION)

server/streamable_http_test.go (2)

16-33: LGTM - Test setup looks good

The initial setup is clear and concise, properly initializing the MCP server with required capabilities, configuring the StreamableHTTPServer, and creating a test HTTP server.


336-401: Session termination test is well structured

The session termination test effectively verifies both the successful termination of a session and the expected behavior when attempting to use a terminated session.

@ezynda3
Copy link
Contributor

ezynda3 commented May 1, 2025

This is great. One thing though. We recently merged some code around session management so that should be used in this PR as well. https://github.com/mark3labs/mcp-go/blob/main/server/session.go

)

// streamableHTTPSession represents an active Streamable HTTP connection.
type streamableHTTPSession struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should implement SessionWithTools as shown here https://github.com/mark3labs/mcp-go/blob/main/server/session.go#L22-L31

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #232 where I implemented it for SSEServer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, Please help to review again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems there is a race condition now

@Gelembjuk
Copy link
Contributor

+1

Thanks for your work. I am waiting this is approved to implement my idea where http streaming would be the best option

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
server/streamable_http_test.go (1)

160-178: Add timeout to prevent test hanging.

The event reading loop lacks a timeout mechanism which could cause the test to hang indefinitely if there's an issue with the event stream.

server/streamable_http.go (1)

391-392: ⚠️ Potential issue

Fix redundant check for stateless mode.

This check has the same issue as the previous one - s.sessionIDGenerator() will never return an empty string.

- // Only create a session if we're not in stateless mode
- if s.sessionIDGenerator() != "" {
+ // Only create a session if we're not in stateless mode
+ if !s.stateless {
🧹 Nitpick comments (8)
server/streamable_http_test.go (2)

355-356: Simplify string creation by avoiding unnecessary fmt.Sprintf.

The fmt.Sprintf is unnecessary when the entire string is static.

- rawNotification := fmt.Sprintf(`{"jsonrpc":"2.0","method":"test/notification","params":{"message":"Hello, world!"}}`)
+ rawNotification := `{"jsonrpc":"2.0","method":"test/notification","params":{"message":"Hello, world!"}}`
🧰 Tools
🪛 golangci-lint (1.64.8)

355-355: S1039: unnecessary use of fmt.Sprintf

(gosimple)


356-370: Consider removing redundant manual notification handling.

This code creates a fallback notification in case the actual notification doesn't have the expected format, but it's unnecessary if the notification format is consistent. The test should verify what was actually sent rather than creating a second fallback notification.

- // Create a notification with the correct format for testing
- rawNotification := fmt.Sprintf(`{"jsonrpc":"2.0","method":"test/notification","params":{"message":"Hello, world!"}}`)
-
- // Parse the raw notification
- var manualNotification map[string]interface{}
- if err := json.Unmarshal([]byte(rawNotification), &manualNotification); err != nil {
-   t.Fatalf("Failed to decode manual notification: %v", err)
- }
-
- // Check if message exists in params
- message, ok := params["message"]
- if !ok {
-   // If message doesn't exist in params, use the manual notification for testing
-   manualParams := manualNotification["params"].(map[string]interface{})
-   message = manualParams["message"]
-   t.Logf("Using manual notification for testing")
- }
+ // Check if message exists in params
+ message, ok := params["message"]
+ if !ok {
+   t.Errorf("Expected message in params, but not found")
+   return
+ }
server/streamable_http.go (6)

22-22: Remove unused field lastEventID.

The lastEventID field is declared but never used in the code.

type streamableHTTPSession struct {
	sessionID           string
	notificationChannel chan mcp.JSONRPCNotification
	initialized         atomic.Bool
-	lastEventID         string
	eventStore          EventStore
	sessionTools        sync.Map // Maps tool name to ServerTool
}
🧰 Tools
🪛 golangci-lint (1.64.8)

22-22: field lastEventID is unused

(unused)


214-214: Remove unused field streamMapping.

The streamMapping field is declared but only used in the unused writeSSEEvent method.

type StreamableHTTPServer struct {
	server             *MCPServer
	baseURL            string
	basePath           string
	endpoint           string
	sessions           sync.Map // Maps sessionID to ClientSession
	srv                *http.Server
	contextFunc        SSEContextFunc
	sessionIDGenerator func() string
	enableJSONResponse bool
	eventStore         EventStore
	standaloneStreamID string
-	streamMapping      sync.Map // Maps streamID to response writer
	requestToStreamMap sync.Map // Maps requestID to streamID
}
🧰 Tools
🪛 golangci-lint (1.64.8)

214-214: field streamMapping is unused

(unused)


683-711: Remove unused method writeSSEEvent.

This method is never called in the codebase. It appears to be leftover code from an earlier implementation approach.

- // writeSSEEvent writes an SSE event to the given stream
- func (s *StreamableHTTPServer) writeSSEEvent(streamID string, event string, message mcp.JSONRPCMessage) error {
-	// Get the stream channel
-	streamChanI, ok := s.streamMapping.Load(streamID)
-	if !ok {
-		return fmt.Errorf("stream not found: %s", streamID)
-	}
-
-	streamChan, ok := streamChanI.(chan string)
-	if !ok {
-		return fmt.Errorf("invalid stream channel type")
-	}
-
-	// Marshal the message
-	data, err := json.Marshal(message)
-	if err != nil {
-		return err
-	}
-
-	// Create the event data
-	eventData := fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)
-
-	// Send the event to the channel
-	select {
-	case streamChan <- eventData:
-		return nil
-	default:
-		return fmt.Errorf("stream channel full")
-	}
- }
🧰 Tools
🪛 golangci-lint (1.64.8)

683-683: func (*StreamableHTTPServer).writeSSEEvent is unused

(unused)


763-778: Remove unused method validateSession.

This method is defined but never called in the codebase.

- // validateSession checks if the session ID is valid and the session is initialized
- func (s *StreamableHTTPServer) validateSession(sessionID string) bool {
-	// Check if the session ID is valid
-	if sessionID == "" {
-		return false
-	}
-
-	// Check if the session exists
-	if sessionValue, ok := s.sessions.Load(sessionID); ok {
-		// Check if the session is initialized
-		if httpSession, ok := sessionValue.(*streamableHTTPSession); ok {
-			return httpSession.Initialized()
-		}
-	}
-
-	return false
- }
🧰 Tools
🪛 golangci-lint (1.64.8)

763-763: func (*StreamableHTTPServer).validateSession is unused

(unused)


496-497: Use structured logging instead of fmt.Printf.

Direct use of fmt.Printf for error logging is not ideal for a library. Consider using a proper logging interface or returning errors to the caller.

- // Log the error but continue
- fmt.Printf("Error replaying events: %v\n", err)
+ // If the server has a logger, use it
+ if s.server.logger != nil {
+     s.server.logger.Printf("Error replaying events: %v", err)
+ }

Apply similar changes to other instances of fmt.Printf throughout the code.


621-622: Simplify string creation by avoiding unnecessary fmt.Sprintf.

The fmt.Sprintf is unnecessary when the entire string is static.

- initialEvent := fmt.Sprintf("data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n")
+ initialEvent := "data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n"
🧰 Tools
🪛 golangci-lint (1.64.8)

621-621: S1039: unnecessary use of fmt.Sprintf

(gosimple)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f5303c and 4e91225.

📒 Files selected for processing (2)
  • server/streamable_http.go (1 hunks)
  • server/streamable_http_test.go (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
server/streamable_http.go (4)
mcp/types.go (4)
  • JSONRPCNotification (206-209)
  • JSONRPCMessage (89-89)
  • Request (103-116)
  • JSONRPCError (219-232)
server/server.go (1)
  • ServerTool (50-53)
server/session.go (2)
  • ClientSession (11-20)
  • SessionWithTools (23-31)
server/sse.go (1)
  • SSEContextFunc (37-37)
🪛 golangci-lint (1.64.8)
server/streamable_http.go

22-22: field lastEventID is unused

(unused)


214-214: field streamMapping is unused

(unused)


683-683: func (*StreamableHTTPServer).writeSSEEvent is unused

(unused)


763-763: func (*StreamableHTTPServer).validateSession is unused

(unused)


621-621: S1039: unnecessary use of fmt.Sprintf

(gosimple)

server/streamable_http_test.go

355-355: S1039: unnecessary use of fmt.Sprintf

(gosimple)

🔇 Additional comments (4)
server/streamable_http_test.go (1)

324-332: LGTM! Good use of timeout and channel select pattern.

The code efficiently waits for the notification to be read with a reasonable timeout, preventing test hangs.

server/streamable_http.go (3)

18-25: Implement SessionWithTools interface as requested in PR comments.

Good implementation of the SessionWithTools interface for the streamable HTTP session. This addresses the comment from reviewer 'ezynda3' about implementing the interface from server/session.go.

🧰 Tools
🪛 golangci-lint (1.64.8)

22-22: field lastEventID is unused

(unused)


401-401: Good implementation of session initialization.

The code correctly initializes new sessions, which addresses a past review comment about sessions never being marked as initialized.


43-66: LGTM! Clean implementation of SessionWithTools interface.

The implementation of GetSessionTools and SetSessionTools is thread-safe using a sync.Map and properly handles tool mapping conversions.

Comment on lines +294 to +295
time.Sleep(500 * time.Millisecond)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use a more robust approach instead of fixed delay.

Using a fixed sleep duration could lead to flaky tests on slower systems or when the system is under load.

- // Give a small delay to ensure the notification is processed and flushed
- time.Sleep(500 * time.Millisecond)
+ // Use a more robust approach with polling and timeout
+ const maxRetries = 10
+ const retryInterval = 100 * time.Millisecond
+ notificationSent := make(chan struct{})
+ go func() {
+   // Signal that the notification has been sent
+   notificationSent <- struct{}{}
+ }()
+ // Wait for notification to be sent with timeout
+ select {
+ case <-notificationSent:
+   // Notification sent successfully, continue
+ case <-time.After(2 * time.Second):
+   t.Fatalf("Timeout waiting for notification to be sent")
+ }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (8)
server/streamable_http.go (8)

220-223: Remove unused field streamMapping

The streamMapping field is only used in the writeSSEEvent method, which itself is unused according to static analysis.

	standaloneStreamID string
-	streamMapping      sync.Map // Maps streamID to response writer
	requestToStreamMap sync.Map // Maps requestID to streamID
	statelessMode      bool
🧰 Tools
🪛 golangci-lint (1.64.8)

221-221: field streamMapping is unused

(unused)


628-629: Simplify string formatting

Use a simple string literal instead of fmt.Sprintf when no formatting is needed.

-	initialEvent := fmt.Sprintf("data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n")
+	initialEvent := "data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n"
🧰 Tools
🪛 golangci-lint (1.64.8)

628-628: S1039: unnecessary use of fmt.Sprintf

(gosimple)


690-718: Remove unused writeSSEEvent method

The writeSSEEvent method is never called in the codebase. Consider removing it unless it's intended for future use.

-// writeSSEEvent writes an SSE event to the given stream
-func (s *StreamableHTTPServer) writeSSEEvent(streamID string, event string, message mcp.JSONRPCMessage) error {
-	// Get the stream channel
-	streamChanI, ok := s.streamMapping.Load(streamID)
-	if !ok {
-		return fmt.Errorf("stream not found: %s", streamID)
-	}
-
-	streamChan, ok := streamChanI.(chan string)
-	if !ok {
-		return fmt.Errorf("invalid stream channel type")
-	}
-
-	// Marshal the message
-	data, err := json.Marshal(message)
-	if err != nil {
-		return err
-	}
-
-	// Create the event data
-	eventData := fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)
-
-	// Send the event to the channel
-	select {
-	case streamChan <- eventData:
-		return nil
-	default:
-		return fmt.Errorf("stream channel full")
-	}
-}
🧰 Tools
🪛 golangci-lint (1.64.8)

690-690: func (*StreamableHTTPServer).writeSSEEvent is unused

(unused)


770-785: Remove unused validateSession method

The validateSession method is never called in the codebase. Consider removing it unless it's intended for future use.

-// validateSession checks if the session ID is valid and the session is initialized
-func (s *StreamableHTTPServer) validateSession(sessionID string) bool {
-	// Check if the session ID is valid
-	if sessionID == "" {
-		return false
-	}
-
-	// Check if the session exists
-	if sessionValue, ok := s.sessions.Load(sessionID); ok {
-		// Check if the session is initialized
-		if httpSession, ok := sessionValue.(*streamableHTTPSession); ok {
-			return httpSession.Initialized()
-		}
-	}
-
-	return false
-}
🧰 Tools
🪛 golangci-lint (1.64.8)

770-770: func (*StreamableHTTPServer).validateSession is unused

(unused)


22-22: Remove unused field lastEventID

The lastEventID field in streamableHTTPSession is declared but never used.

	notificationChannel chan mcp.JSONRPCNotification
	initialized         atomic.Bool
-	lastEventID         string
	eventStore          EventStore
🧰 Tools
🪛 golangci-lint (1.64.8)

22-22: field lastEventID is unused

(unused)


502-504: Add structured logging instead of fmt.Printf

Replace fmt.Printf with a proper logging mechanism for error reporting.

-		// Log the error but continue
-		fmt.Printf("Error replaying events: %v\n", err)
+		// Log the error but continue
+		// Consider using a proper logger like logrus or zap
+		// Or if you prefer to keep it simple:
+		log.Printf("Error replaying events: %v", err)

522-523: Add structured logging instead of fmt.Printf

Similar to the previous comment, replace fmt.Printf with a proper logging mechanism.

-		// Log the error but continue
-		fmt.Printf("Error storing event: %v\n", storeErr)
+		// Log the error but continue
+		// Consider using a proper logger like logrus or zap
+		// Or if you prefer to keep it simple:
+		log.Printf("Error storing event: %v", storeErr)

559-560: Add structured logging instead of fmt.Printf

Again, replace fmt.Printf with a proper logging mechanism.

-		// Log the error but continue
-		fmt.Printf("Error storing event: %v\n", storeErr)
+		// Log the error but continue
+		// Consider using a proper logger like logrus or zap
+		// Or if you prefer to keep it simple:
+		log.Printf("Error storing event: %v", storeErr)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e91225 and e160f19.

📒 Files selected for processing (1)
  • server/streamable_http.go (1 hunks)
🧰 Additional context used
🪛 golangci-lint (1.64.8)
server/streamable_http.go

22-22: field lastEventID is unused

(unused)


221-221: field streamMapping is unused

(unused)


690-690: func (*StreamableHTTPServer).writeSSEEvent is unused

(unused)


770-770: func (*StreamableHTTPServer).validateSession is unused

(unused)


628-628: S1039: unnecessary use of fmt.Sprintf

(gosimple)

🔇 Additional comments (6)
server/streamable_http.go (6)

17-70: Implementation satisfies the SessionWithTools interface correctly.

The session struct and methods are well-implemented. This appropriately addresses the requirements from previous comments to implement the SessionWithTools interface.

🧰 Tools
🪛 golangci-lint (1.64.8)

22-22: field lastEventID is unused

(unused)


72-166: Good implementation of the event store for resumability.

The EventStore interface and InMemoryEventStore implementation provide a solid foundation for event storage and replay, which is essential for session resumability. The implementation correctly uses mutex locking for thread safety and has proper error handling.


168-206: Well-structured configuration options using the functional options pattern.

The server configuration options are well-designed using the functional options pattern, which provides flexibility and clear intent when configuring the server.


387-390: Correctly implemented stateless mode check.

The stateless mode check has been properly implemented, fixing a previous issue where the check was using s.sessionIDGenerator() != "", which would never work correctly.


407-409: Session is now correctly initialized.

The session is now properly initialized before being stored and registered, addressing a previous issue where sessions were never marked as initialized.


1-787: Overall implementation looks solid with good attention to detail.

The implementation of the Streamable HTTP transport protocol is well-structured and covers all the key aspects including session management, event streaming, resumability, and proper HTTP protocol handling. A few minor improvements have been suggested, but the core functionality is robust.

🧰 Tools
🪛 golangci-lint (1.64.8)

22-22: field lastEventID is unused

(unused)


221-221: field streamMapping is unused

(unused)


690-690: func (*StreamableHTTPServer).writeSSEEvent is unused

(unused)


770-770: func (*StreamableHTTPServer).validateSession is unused

(unused)


628-628: S1039: unnecessary use of fmt.Sprintf

(gosimple)

Copy link
Contributor

@robert-jackson-glean robert-jackson-glean left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @tendant!

There are a number of features in SSEServer that are not present in the new StreamableHTTPServer that I think we should add:

// Keep-alive support
WithKeepAliveInterval(keepAliveInterval time.Duration)
WithKeepAlive(keepAlive bool)

// Dynamic path support
WithDynamicBasePath(fn DynamicBasePathFunc)

// Explicit handler mounting
func (s *SSEServer) SSEHandler() http.Handler
func (s *SSEServer) MessageHandler() http.Handler

These were added in #80 + #169 (for Ping support) and #214 (for dynamic path support) IIRC (if it helps to review them independently).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants